package query

import (
	"context"
	"fmt"

	"github.com/influxdata/flux"
	"github.com/influxdata/flux/execute"
	"github.com/influxdata/flux/memory"
	"github.com/influxdata/flux/plan"
	"github.com/influxdata/influxdb/v2/kit/platform"
	"github.com/influxdata/influxdb/v2/storage/reads/datatypes"
	"github.com/influxdata/influxdb/v2/tsdb/cursors"
)

// StorageReader is an interface for reading tables from the storage subsystem.
type StorageReader interface {
	ReadFilter(ctx context.Context, spec ReadFilterSpec, alloc memory.Allocator) (TableIterator, error)
	ReadGroup(ctx context.Context, spec ReadGroupSpec, alloc memory.Allocator) (TableIterator, error)
	ReadWindowAggregate(ctx context.Context, spec ReadWindowAggregateSpec, alloc memory.Allocator) (TableIterator, error)

	ReadTagKeys(ctx context.Context, spec ReadTagKeysSpec, alloc memory.Allocator) (TableIterator, error)
	ReadTagValues(ctx context.Context, spec ReadTagValuesSpec, alloc memory.Allocator) (TableIterator, error)

	ReadSeriesCardinality(ctx context.Context, spec ReadSeriesCardinalitySpec, alloc memory.Allocator) (TableIterator, error)
	SupportReadSeriesCardinality(ctx context.Context) bool

	Close()
}

type ReadFilterSpec struct {
	OrganizationID platform.ID
	BucketID       platform.ID

	Bounds    execute.Bounds
	Predicate *datatypes.Predicate
}

type ReadGroupSpec struct {
	ReadFilterSpec

	GroupMode GroupMode
	GroupKeys []string

	AggregateMethod string
}

func (spec *ReadGroupSpec) Name() string {
	return fmt.Sprintf("readGroup(%s)", spec.AggregateMethod)
}

type ReadTagKeysSpec struct {
	ReadFilterSpec
}

type ReadTagValuesSpec struct {
	ReadFilterSpec
	TagKey string
}

type ReadSeriesCardinalitySpec struct {
	ReadFilterSpec
}

// ReadWindowAggregateSpec defines the options for WindowAggregate.
//
// Window and the WindowEvery/Offset should be mutually exclusive. If you set either the WindowEvery or Offset with
// nanosecond values, then the Window will be ignored.
type ReadWindowAggregateSpec struct {
	ReadFilterSpec
	WindowEvery int64
	Offset      int64
	Aggregates  []plan.ProcedureKind
	CreateEmpty bool
	TimeColumn  string
	Window      execute.Window

	// ForceAggregate forces all aggregates to be treated as aggregates.
	// This forces selectors, which normally don't return values for empty
	// windows, to return a null value.
	ForceAggregate bool
}

func (spec *ReadWindowAggregateSpec) Name() string {
	var agg string
	if len(spec.Aggregates) > 0 {
		agg = string(spec.Aggregates[0])
	}
	return fmt.Sprintf("readWindow(%s)", agg)
}

// TableIterator is a table iterator that also keeps track of cursor statistics from the storage engine.
type TableIterator interface {
	flux.TableIterator
	Statistics() cursors.CursorStats
}

type GroupMode int

const (
	// GroupModeNone merges all series into a single group.
	GroupModeNone GroupMode = iota
	// GroupModeBy produces a table for each unique value of the specified GroupKeys.
	GroupModeBy
)

// ToGroupMode accepts the group mode from Flux and produces the appropriate storage group mode.
func ToGroupMode(fluxMode flux.GroupMode) GroupMode {
	switch fluxMode {
	case flux.GroupModeNone:
		return GroupModeNone
	case flux.GroupModeBy:
		return GroupModeBy
	default:
		panic(fmt.Sprint("unknown group mode: ", fluxMode))
	}
}
